<!DOCTYPE html>
<html lang="en">

<head>
	<meta charset="UTF-8">
	<meta name="viewport" content="width=device-width, initial-scale=1.0, maximum-scale=1.0, user-scalable=no">
	<meta name="keywords" content="fescar、seata、分布式事务" />
	<meta name="description" content="seata-sourcecode-server-bootstrap" />
	<!-- 网页标签标题 -->
	<title>分布式事务Seata源码-Server端启动流程</title>
  <link rel="shortcut icon" href="/img/seata_logo_small.jpeg"/>
	<link rel="stylesheet" href="/build/blogDetail.css" />
</head>
<body>
	<div id="root"><div class="blog-detail-page" data-reactroot=""><header class="header-container header-container-normal"><div class="header-body"><a href="/zh-cn/index.html"><img class="logo" src="//img.alicdn.com/tfs/TB1gqL1w4D1gK0jSZFyXXciOVXa-1497-401.png"/></a><div class="search search-normal"><span class="icon-search"></span></div><span class="language-switch language-switch-normal">En</span><div class="header-menu"><img class="header-menu-toggle" src="https://img.alicdn.com/tfs/TB14eEmw7P2gK0jSZPxXXacQpXa-38-32.png"/><ul><li class="menu-item menu-item-normal"><a href="/zh-cn/index.html" target="_self">首页</a></li><li class="menu-item menu-item-normal"><a href="/zh-cn/docs/overview/what-is-seata.html" target="_self">文档</a></li><li class="menu-item menu-item-normal"><a href="/zh-cn/docs/developers/developers_dev.html" target="_self">开发者</a></li><li class="menu-item menu-item-normal menu-item-normal-active"><a href="/zh-cn/blog/index.html" target="_self">博客</a></li><li class="menu-item menu-item-normal"><a href="/zh-cn/community/index.html" target="_self">社区</a></li><li class="menu-item menu-item-normal"><a href="/zh-cn/blog/download.html" target="_self">下载</a></li></ul></div></div></header><section class="blog-content markdown-body"><h2>【分布式事务Seata源码解读一】Server端启动流程</h2>
<h3>实现分布式事务的核心要点：</h3>
<ol>
<li>事务的持久化，事务所处的各种状态事务参与方的各种状态都需要持久化，当实例宕机时才能基于持久化的数据对事务回滚或提交，实现最终一致性</li>
<li>定时对超时未完成事务的处理（继续尝试提交或回滚），即通过重试机制实现事务的最终一致性</li>
<li>分布式事务的跨服务实例传播，当分布式事务跨多个实例时需要实现事务的传播，一般需要适配不同的rpc框架</li>
<li>事务的隔离级别：大多数分布式事务为了性能，默认的隔离级别是读未提交</li>
<li>幂等性：对于XA或者seata的AT这样的分布式事务来说，都已经默认实现了幂等性，而TCC、Saga这种接口级别实现的分布式事务都还需要业务开发者自己实现幂等性。</li>
</ol>
<p>本片文章主要从seata-server的启动流程的角度介绍一下seata-server的源码，启动流程图如下：</p>
<p><img src="https://img-blog.csdnimg.cn/20200726213919467.jpg?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80NTE0NTg0OA==,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"></p>
<h4>1. 启动类Server</h4>
<p>seata-server的入口类在Server类中，源码如下：</p>
<pre><code class="language-java"><span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">main</span><span class="hljs-params">(String[] args)</span> <span class="hljs-keyword">throws</span> IOException </span>{
    <span class="hljs-comment">// 从环境变量或运行时参数中获取监听端口，默认端口8091</span>
    <span class="hljs-keyword">int</span> port = PortHelper.getPort(args);
    
    <span class="hljs-comment">// 把监听端口设置到SystemProperty中，Logback的LoggerContextListener实现类</span>
    <span class="hljs-comment">// SystemPropertyLoggerContextListener会把Port写入到Logback的Context中，</span>
    <span class="hljs-comment">// 在logback.xml文件中会使用Port变量来构建日志文件名称。</span>
    System.setProperty(ConfigurationKeys.SERVER_PORT, Integer.toString(port));

    <span class="hljs-comment">// 创建Logger</span>
    <span class="hljs-keyword">final</span> Logger logger = LoggerFactory.getLogger(Server.class);
    <span class="hljs-keyword">if</span> (ContainerHelper.isRunningInContainer()) {
        logger.info(<span class="hljs-string">"The server is running in container."</span>);
    }

    <span class="hljs-comment">// 解析启动以及配置文件的各种配置参数</span>
    ParameterParser parameterParser = <span class="hljs-keyword">new</span> ParameterParser(args);

    <span class="hljs-comment">// metrics相关，这里是使用SPI机制获取Registry实例对象</span>
    MetricsManager.get().init();
    
	<span class="hljs-comment">// 把从配置文件中读取到的storeMode写入SystemProperty中，方便其他类使用。</span>
    System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());
    
	<span class="hljs-comment">// 创建NettyRemotingServer实例，NettyRemotingServer是一个基于Netty实现的Rpc框架，</span>
	<span class="hljs-comment">// 此时并没有初始化，NettyRemotingServer负责与客户端SDK中的TM、RM进行网络通信。</span>
     nettyRemotingServer = <span class="hljs-keyword">new</span> NettyRemotingServer(WORKING_THREADS);
    
    <span class="hljs-comment">// 设置监听端口</span>
    nettyRemotingServer.setListenPort(parameterParser.getPort());
    
	<span class="hljs-comment">// UUIDGenerator初始化，UUIDGenerator基于雪花算法实现，</span>
	<span class="hljs-comment">// 用于生成全局事务、分支事务的id。</span>
	<span class="hljs-comment">// 多个Server实例配置不同的ServerNode，保证id的唯一性</span>
    UUIDGenerator.init(parameterParser.getServerNode());
    
	<span class="hljs-comment">// SessionHodler负责事务日志（状态）的持久化存储，</span>
	<span class="hljs-comment">// 当前支持file、db、redis三种存储模式，集群部署模式要使用db或redis模式</span>
    SessionHolder.init(parameterParser.getStoreMode());
    
  	<span class="hljs-comment">// 创建初始化DefaultCoordinator实例，DefaultCoordinator是TC的核心事务逻辑处理类，</span>
  	<span class="hljs-comment">// 底层包含了AT、TCC、SAGA等不同事务类型的逻辑处理。</span>
    DefaultCoordinator coordinator = <span class="hljs-keyword">new</span> DefaultCoordinator(nettyRemotingServer);
    coordinator.init();
    nettyRemotingServer.setHandler(coordinator);
    <span class="hljs-comment">// register ShutdownHook</span>
    ShutdownHook.getInstance().addDisposable(coordinator);
    ShutdownHook.getInstance().addDisposable(nettyRemotingServer);

    <span class="hljs-comment">// 127.0.0.1 and 0.0.0.0 are not valid here.</span>
    <span class="hljs-keyword">if</span> (NetUtil.isValidIp(parameterParser.getHost(), <span class="hljs-keyword">false</span>)) {
        XID.setIpAddress(parameterParser.getHost());
    } <span class="hljs-keyword">else</span> {
        XID.setIpAddress(NetUtil.getLocalIp());
    }
    XID.setPort(nettyRemotingServer.getListenPort());

    <span class="hljs-keyword">try</span> {
        <span class="hljs-comment">// 初始化Netty，开始监听端口并阻塞在这里，等待程序关闭</span>
        nettyRemotingServer.init();
    } <span class="hljs-keyword">catch</span> (Throwable e) {
        logger.error(<span class="hljs-string">"nettyServer init error:{}"</span>, e.getMessage(), e);
        System.exit(-<span class="hljs-number">1</span>);
    }

    System.exit(<span class="hljs-number">0</span>);
}
</code></pre>
<h4>2. 解析配置</h4>
<p>参数解析的实现代码在ParameterParser类中，init方法源码如下：</p>
<pre><code class="language-java"><span class="hljs-function"><span class="hljs-keyword">private</span> <span class="hljs-keyword">void</span> <span class="hljs-title">init</span><span class="hljs-params">(String[] args)</span> </span>{
   <span class="hljs-keyword">try</span> {
   	   <span class="hljs-comment">// 判断是否运行在容器中，如果运行在容器中则配置从环境变量中获取</span>
       <span class="hljs-keyword">if</span> (ContainerHelper.isRunningInContainer()) {
           <span class="hljs-keyword">this</span>.seataEnv = ContainerHelper.getEnv();
           <span class="hljs-keyword">this</span>.host = ContainerHelper.getHost();
           <span class="hljs-keyword">this</span>.port = ContainerHelper.getPort();
           <span class="hljs-keyword">this</span>.serverNode = ContainerHelper.getServerNode();
           <span class="hljs-keyword">this</span>.storeMode = ContainerHelper.getStoreMode();
       } <span class="hljs-keyword">else</span> {
           <span class="hljs-comment">// 基于JCommander获取启动应用程序时配置的参数，</span>
           <span class="hljs-comment">// JCommander通过注解、反射的方式把参数赋值到当前类的字段上。</span>
           JCommander jCommander = JCommander.newBuilder().addObject(<span class="hljs-keyword">this</span>).build();
           jCommander.parse(args);
           <span class="hljs-keyword">if</span> (help) {
               jCommander.setProgramName(PROGRAM_NAME);
               jCommander.usage();
               System.exit(<span class="hljs-number">0</span>);
           }
       }
       <span class="hljs-comment">// serverNode用于雪花算中的实例的唯一标识，需要保证唯一。</span>
       <span class="hljs-comment">// 如果没有指定基于当前服务器的I随机生成一个</span>
       <span class="hljs-keyword">if</span> (<span class="hljs-keyword">this</span>.serverNode == <span class="hljs-keyword">null</span>) {
           <span class="hljs-keyword">this</span>.serverNode = IdWorker.initWorkerId();
       }
       <span class="hljs-keyword">if</span> (StringUtils.isNotBlank(seataEnv)) {
           System.setProperty(ENV_PROPERTY_KEY, seataEnv);
       }
       <span class="hljs-keyword">if</span> (StringUtils.isBlank(storeMode)) {
           <span class="hljs-comment">// 这里牵扯到一个重要的Configuration类，ParameterParser只负责获取ip、port、storeMode等核心参数，</span>
           <span class="hljs-comment">// 其他的参数都是从Configuration中获取的。这里如果没有启动参数没有指定storeMode，</span>
           <span class="hljs-comment">// 就从Configuration类中获取。</span>
           storeMode = ConfigurationFactory.getInstance().getConfig(ConfigurationKeys.STORE_MODE,
               SERVER_DEFAULT_STORE_MODE);
       }
   } <span class="hljs-keyword">catch</span> (ParameterException e) {
       printError(e);
   }

}
</code></pre>
<p>在ParameterParser的init方法中第一次调用了ConfigurationFactory.getInstance()，初始化了一个单例的Configuration对象，Configuration负责初始化所有的其他配置参数信息。从Seata Server端的源码中我们能看到两个配置文件file.conf、registry.conf。那么这两个配置文件的区别是什么，两个文件都是必须的吗？我们继续看代码。</p>
<p>ConfigurationFactory.getInstance方法其实就是获取一个单例对象，核心在buildConfiguration方法中，不过在buidlConfiguration方法前，ConfigurationFactory类有一段static代码块会先执行。</p>
<pre><code class="language-java"><span class="hljs-comment">// 获取Configuration的单例对象</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> Configuration <span class="hljs-title">getInstance</span><span class="hljs-params">()</span> </span>{
    <span class="hljs-keyword">if</span> (instance == <span class="hljs-keyword">null</span>) {
        <span class="hljs-keyword">synchronized</span> (Configuration.class) {
            <span class="hljs-keyword">if</span> (instance == <span class="hljs-keyword">null</span>) {
                instance = buildConfiguration();
            }
        }
    }
    <span class="hljs-keyword">return</span> instance;
}

<span class="hljs-comment">// ConfigurationFactory的static代码块</span>
<span class="hljs-keyword">static</span> {
    <span class="hljs-comment">// 获取配置文件的名称，默认为registry.conf</span>
    String seataConfigName = System.getProperty(SYSTEM_PROPERTY_SEATA_CONFIG_NAME);
    <span class="hljs-keyword">if</span> (seataConfigName == <span class="hljs-keyword">null</span>) {
        seataConfigName = System.getenv(ENV_SEATA_CONFIG_NAME);
    }
    <span class="hljs-keyword">if</span> (seataConfigName == <span class="hljs-keyword">null</span>) {
        seataConfigName = REGISTRY_CONF_PREFIX;
    }
    String envValue = System.getProperty(ENV_PROPERTY_KEY);
    <span class="hljs-keyword">if</span> (envValue == <span class="hljs-keyword">null</span>) {
        envValue = System.getenv(ENV_SYSTEM_KEY);
    }
    
    <span class="hljs-comment">// 读取registry.conf文件的配置，构建基础的Configuration对象</span>
    Configuration configuration = (envValue == <span class="hljs-keyword">null</span>) ? <span class="hljs-keyword">new</span> FileConfiguration(seataConfigName + REGISTRY_CONF_SUFFIX,
        <span class="hljs-keyword">false</span>) : <span class="hljs-keyword">new</span> FileConfiguration(seataConfigName + <span class="hljs-string">"-"</span> + envValue + REGISTRY_CONF_SUFFIX, <span class="hljs-keyword">false</span>);
    Configuration extConfiguration = <span class="hljs-keyword">null</span>;
    <span class="hljs-keyword">try</span> {
        <span class="hljs-comment">// ExtConfigurationProvider当前只有一个SpringBootConfigurationProvider实现类</span>
        <span class="hljs-comment">// 用于支持客户端SDK SpringBoot的配置文件方式，对于Server端来说这段逻辑可以忽略。</span>
        extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);
        <span class="hljs-keyword">if</span> (LOGGER.isInfoEnabled()) {
            LOGGER.info(<span class="hljs-string">"load Configuration:{}"</span>, extConfiguration == <span class="hljs-keyword">null</span> ? configuration.getClass().getSimpleName()
                : extConfiguration.getClass().getSimpleName());
        }
    } <span class="hljs-keyword">catch</span> (EnhancedServiceNotFoundException ignore) {

    } <span class="hljs-keyword">catch</span> (Exception e) {
        LOGGER.error(<span class="hljs-string">"failed to load extConfiguration:{}"</span>, e.getMessage(), e);
    }
    CURRENT_FILE_INSTANCE = extConfiguration == <span class="hljs-keyword">null</span> ? configuration : extConfiguration;
}
</code></pre>
<p>ConfigurationFactory中的static代码块是从registry.conf中读取配置信息。registry.conf中主有两个配置信息，<strong>注册中心</strong>和<strong>配置源</strong>，<strong>配置源</strong>用来指定其他更详细的配置项是file.conf或者是apollo等其他配置源。所以registry.conf配置文件时必须的，registry.conf配置文件中指定其他详细配置的配置源，当前配置源支持file、zk、apollo、nacos、etcd3等。所以file.conf不是必须的，只有当设置配置源为file类型时才会读取file.conf文件中的内容。</p>
<p>接下来ConfigurationFactory中的buildConfiguration就是根据registry.conf中设置的配置源来加载更多的配置项。</p>
<pre><code class="language-java"><span class="hljs-function"><span class="hljs-keyword">private</span> <span class="hljs-keyword">static</span> Configuration <span class="hljs-title">buildConfiguration</span><span class="hljs-params">()</span> </span>{
    ConfigType configType;
    String configTypeName;
    <span class="hljs-keyword">try</span> {
    	<span class="hljs-comment">// 从registry.conf配置文件中读取config.type字段值，并解析为枚举ConfigType</span>
        configTypeName = CURRENT_FILE_INSTANCE.getConfig(
            ConfigurationKeys.FILE_ROOT_CONFIG + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR
                + ConfigurationKeys.FILE_ROOT_TYPE);

        <span class="hljs-keyword">if</span> (StringUtils.isBlank(configTypeName)) {
            <span class="hljs-keyword">throw</span> <span class="hljs-keyword">new</span> NotSupportYetException(<span class="hljs-string">"config type can not be null"</span>);
        }

        configType = ConfigType.getType(configTypeName);
    } <span class="hljs-keyword">catch</span> (Exception e) {
        <span class="hljs-keyword">throw</span> e;
    }
    Configuration extConfiguration = <span class="hljs-keyword">null</span>;
    Configuration configuration;
    <span class="hljs-keyword">if</span> (ConfigType.File == configType) {
    	<span class="hljs-comment">// 如果配置文件为file类型，则从registry.conf中读取config.file.name配置项，</span>
    	<span class="hljs-comment">// 即file类型配置文件的路径，示例中默认为file.conf</span>
        String pathDataId = String.join(ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR,
            ConfigurationKeys.FILE_ROOT_CONFIG, FILE_TYPE, NAME_KEY);
        String name = CURRENT_FILE_INSTANCE.getConfig(pathDataId);
        
        <span class="hljs-comment">// 根据file配置文件的路径构建FileConfuguration对象</span>
        configuration = <span class="hljs-keyword">new</span> FileConfiguration(name);
        <span class="hljs-keyword">try</span> {
        	<span class="hljs-comment">// configuration的额外扩展，也是只对客户端SpringBoot的SDK才生效</span>
            extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);
            <span class="hljs-keyword">if</span> (LOGGER.isInfoEnabled()) {
                LOGGER.info(<span class="hljs-string">"load Configuration:{}"</span>, extConfiguration == <span class="hljs-keyword">null</span>
                    ? configuration.getClass().getSimpleName() : extConfiguration.getClass().getSimpleName());
            }
        } <span class="hljs-keyword">catch</span> (EnhancedServiceNotFoundException ignore) {

        } <span class="hljs-keyword">catch</span> (Exception e) {
            LOGGER.error(<span class="hljs-string">"failed to load extConfiguration:{}"</span>, e.getMessage(), e);
        }
    } <span class="hljs-keyword">else</span> {
    	<span class="hljs-comment">// 如果配置文件的类型不是file，如：nacos、zk等，</span>
    	<span class="hljs-comment">// 则通过SPI的方式生成对应的ConfigurationProvider对象</span>
        configuration = EnhancedServiceLoader
            .load(ConfigurationProvider.class, Objects.requireNonNull(configType).name()).provide();
    }
    <span class="hljs-keyword">try</span> {
    	<span class="hljs-comment">// ConfigurationCache是对Configuration做了一次层代理内存缓存，提升获取配置的性能</span>
        Configuration configurationCache;
        <span class="hljs-keyword">if</span> (<span class="hljs-keyword">null</span> != extConfiguration) {
            configurationCache = ConfigurationCache.getInstance().proxy(extConfiguration);
        } <span class="hljs-keyword">else</span> {
            configurationCache = ConfigurationCache.getInstance().proxy(configuration);
        }
        <span class="hljs-keyword">if</span> (<span class="hljs-keyword">null</span> != configurationCache) {
            extConfiguration = configurationCache;
        }
    } <span class="hljs-keyword">catch</span> (EnhancedServiceNotFoundException ignore) {

    } <span class="hljs-keyword">catch</span> (Exception e) {
        LOGGER.error(<span class="hljs-string">"failed to load configurationCacheProvider:{}"</span>, e.getMessage(), e);
    }
    <span class="hljs-keyword">return</span> <span class="hljs-keyword">null</span> == extConfiguration ? configuration : extConfiguration;
}
</code></pre>
<h4>3. 初始化UUIDGenerator</h4>
<p>UUIDGenertor初始化接收一个serverNode参数，UUIDGenertor当前是使用了雪花算法来生成唯一Id，该serverNode用来保证多个seata-server实例生成的唯一id不重复。</p>
<pre><code class="language-java"><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">UUIDGenerator</span> </span>{

    <span class="hljs-comment">/**
     * Generate uuid long.
     *
     * <span class="hljs-doctag">@return</span> the long
     */</span>
    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">long</span> <span class="hljs-title">generateUUID</span><span class="hljs-params">()</span> </span>{
        <span class="hljs-keyword">return</span> IdWorker.getInstance().nextId();
    }

    <span class="hljs-comment">/**
     * Init.
     *
     * <span class="hljs-doctag">@param</span> serverNode the server node id
     */</span>
    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">init</span><span class="hljs-params">(Long serverNode)</span> </span>{
        IdWorker.init(serverNode);
    }
}
</code></pre>
<p>UUIDGenerator是对IdWorker做了封装，唯一id的核心实现逻辑在IdWoker类中，IdWorker是一个雪花算法实现的。此处的IdWorker又是一个单例</p>
<pre><code class="language-java"><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">IdWorker</span>
/**
     * <span class="hljs-title">Constructor</span>
     *
     * @<span class="hljs-title">param</span> <span class="hljs-title">workerId</span>就是上面提到的<span class="hljs-title">ServerNode</span>, 取值范围在0·1023，也就是在64位的<span class="hljs-title">UUID</span>中占10位
     */
    <span class="hljs-title">public</span> <span class="hljs-title">IdWorker</span>(<span class="hljs-title">long</span> <span class="hljs-title">workerId</span>) </span>{
        <span class="hljs-keyword">if</span> (workerId &gt; maxWorkerId || workerId &lt; <span class="hljs-number">0</span>) {
            <span class="hljs-keyword">throw</span> <span class="hljs-keyword">new</span> IllegalArgumentException(
                String.format(<span class="hljs-string">"worker Id can't be greater than %d or less than 0"</span>, maxWorkerId));
        }
        <span class="hljs-keyword">this</span>.workerId = workerId;
    }

    <span class="hljs-comment">/**
     * Get the next ID (the method is thread-safe)
     *
     * <span class="hljs-doctag">@return</span> SnowflakeId
     */</span>
    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">long</span> <span class="hljs-title">nextId</span><span class="hljs-params">()</span> </span>{
        <span class="hljs-keyword">long</span> timestamp = timeGen();

        <span class="hljs-keyword">if</span> (timestamp &lt; lastTimestamp) {
            <span class="hljs-keyword">throw</span> <span class="hljs-keyword">new</span> RuntimeException(String.format(
                <span class="hljs-string">"clock moved backwards.  Refusing to generate id for %d milliseconds"</span>, lastTimestamp - timestamp));
        }

        <span class="hljs-keyword">synchronized</span> (<span class="hljs-keyword">this</span>) {
            <span class="hljs-keyword">if</span> (lastTimestamp == timestamp) {
                sequence = (sequence + <span class="hljs-number">1</span>) &amp; sequenceMask;
                <span class="hljs-keyword">if</span> (sequence == <span class="hljs-number">0</span>) {
                    timestamp = tilNextMillis(lastTimestamp);
                }
            } <span class="hljs-keyword">else</span> {
                sequence = <span class="hljs-number">0L</span>;
            }
            lastTimestamp = timestamp;
        }
        <span class="hljs-comment">//雪花算法64位唯一id组成：第一位0 + 41位时间戳 + 10位workerId + 12位自增序列化（同一时间戳内自增）</span>
        <span class="hljs-keyword">return</span> ((timestamp - twepoch) &lt;&lt; timestampLeftShift) | (workerId &lt;&lt; workerIdShift) | sequence;
    }
</code></pre>
<h4>4. SessionHolder初始化</h4>
<p>SessionHolder负责Session的持久化，一个Session对象对应一个事务，事务分为两种：全局事务（GlobalSession）和分支事务（BranchSession）。
SessionHolder支持file和db两种持久化方式，其中db支持集群模式，推荐使用db。SessionHolder中最主要的四个字段如下：</p>
<pre><code class="language-java"><span class="hljs-comment">// ROOT_SESSION_MANAGER用于获取所有的Setssion，以及Session的创建、更新、删除等。</span>
<span class="hljs-keyword">private</span> <span class="hljs-keyword">static</span> SessionManager ROOT_SESSION_MANAGER;
<span class="hljs-comment">// 用于获取、更新所有的异步commit的Session</span>
<span class="hljs-keyword">private</span> <span class="hljs-keyword">static</span> SessionManager ASYNC_COMMITTING_SESSION_MANAGER;
<span class="hljs-comment">// 用于获取、更新所有需要重试commit的Session</span>
<span class="hljs-keyword">private</span> <span class="hljs-keyword">static</span> SessionManager RETRY_COMMITTING_SESSION_MANAGER;
<span class="hljs-comment">// 用于获取、更新所有需要重试rollback的Session</span>
<span class="hljs-keyword">private</span> <span class="hljs-keyword">static</span> SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;
</code></pre>
<p>SessionHolder的init方法</p>
<pre><code class="language-java"><span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">init</span><span class="hljs-params">(String mode)</span> <span class="hljs-keyword">throws</span> IOException </span>{
    <span class="hljs-keyword">if</span> (StringUtils.isBlank(mode)) {
        mode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE);
    }
    StoreMode storeMode = StoreMode.get(mode);
    <span class="hljs-keyword">if</span> (StoreMode.DB.equals(storeMode)) {
        <span class="hljs-comment">// 这里又用到了SPI的方式加载SessionManager，</span>
        <span class="hljs-comment">// 其实下面获取的四个SessionManager实例都是同一个类DataBaseSessionManager的不同实例，</span>
        <span class="hljs-comment">// 只是给DataBaseSessionManager的构造函数传参不同。</span>
        ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());
        ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
            <span class="hljs-keyword">new</span> Object[] {ASYNC_COMMITTING_SESSION_MANAGER_NAME});
        RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
            <span class="hljs-keyword">new</span> Object[] {RETRY_COMMITTING_SESSION_MANAGER_NAME});
        RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
            <span class="hljs-keyword">new</span> Object[] {RETRY_ROLLBACKING_SESSION_MANAGER_NAME});
    } <span class="hljs-keyword">else</span> <span class="hljs-keyword">if</span> (StoreMode.FILE.equals(storeMode)) {
        <span class="hljs-comment">//file模式可以先不关心</span>
        ...
    } <span class="hljs-keyword">else</span> {
        <span class="hljs-keyword">throw</span> <span class="hljs-keyword">new</span> IllegalArgumentException(<span class="hljs-string">"unknown store mode:"</span> + mode);
    }
    <span class="hljs-comment">// reload方法对于db模式可以忽略</span>
    reload();
}
</code></pre>
<p>上面看到SessionHolder中的四个SessionManager本质都是类DataBaseSessionManager的实例，只是给构造函数传参不同，看下DataBaseSessionManager的定义：</p>
<pre><code class="language-java"><span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-title">DataBaseSessionManager</span><span class="hljs-params">(String name)</span> </span>{
	<span class="hljs-keyword">super</span>();
	<span class="hljs-keyword">this</span>.taskName = name;
}

<span class="hljs-comment">// 根据实例的taskName来决定allSessions返回的事务列表，</span>
<span class="hljs-comment">// 如taskName等于ASYNC_COMMITTING_SESSION_MANAGER_NAME的</span>
<span class="hljs-comment">// 就返回所有状态为AsyncCommitting的事务。</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> Collection&lt;GlobalSession&gt; <span class="hljs-title">allSessions</span><span class="hljs-params">()</span> </span>{
	<span class="hljs-comment">// get by taskName</span>
	<span class="hljs-keyword">if</span> (SessionHolder.ASYNC_COMMITTING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
			<span class="hljs-keyword">return</span> findGlobalSessions(<span class="hljs-keyword">new</span> SessionCondition(GlobalStatus.AsyncCommitting));
	} <span class="hljs-keyword">else</span> <span class="hljs-keyword">if</span> (SessionHolder.RETRY_COMMITTING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
			<span class="hljs-keyword">return</span> findGlobalSessions(<span class="hljs-keyword">new</span> SessionCondition(<span class="hljs-keyword">new</span> GlobalStatus[] {GlobalStatus.CommitRetrying}));
	} <span class="hljs-keyword">else</span> <span class="hljs-keyword">if</span> (SessionHolder.RETRY_ROLLBACKING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
			<span class="hljs-keyword">return</span> findGlobalSessions(<span class="hljs-keyword">new</span> SessionCondition(<span class="hljs-keyword">new</span> GlobalStatus[] {GlobalStatus.RollbackRetrying,
					GlobalStatus.Rollbacking, GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbackRetrying}));
	} <span class="hljs-keyword">else</span> {
		<span class="hljs-comment">// taskName为null，则对应ROOT_SESSION_MANAGER，即获取所有状态的事务</span>
		<span class="hljs-keyword">return</span> findGlobalSessions(<span class="hljs-keyword">new</span> SessionCondition(<span class="hljs-keyword">new</span> GlobalStatus[] {
				GlobalStatus.UnKnown, GlobalStatus.Begin,
				GlobalStatus.Committing, GlobalStatus.CommitRetrying, GlobalStatus.Rollbacking,
				GlobalStatus.RollbackRetrying,
				GlobalStatus.TimeoutRollbacking, 
				GlobalStatus.TimeoutRollbackRetrying,
				GlobalStatus.AsyncCommitting}));
	}
}
</code></pre>
<h4>5. 初始化DefaultCoordinator</h4>
<p>DefaultCoordinator是事务协调器的核心，如：开启、提交、回滚全局事务，注册、提交、回滚分支事务都是由DefaultCoordinator负责协调处理的。DefaultCoordinato通过RpcServer与远程的TM、RM通信来实现分支事务的提交、回滚等。</p>
<pre><code class="language-java"><span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-title">DefaultCoordinator</span><span class="hljs-params">(ServerMessageSender messageSender)</span> </span>{
	<span class="hljs-comment">// 接口messageSender的实现类就是上文提到的RpcServer</span>
	<span class="hljs-keyword">this</span>.messageSender = messageSender;
	
	<span class="hljs-comment">// DefaultCore封装了AT、TCC、Saga等分布式事务模式的具体实现类</span>
	<span class="hljs-keyword">this</span>.core = <span class="hljs-keyword">new</span> DefaultCore(messageSender);
}

<span class="hljs-comment">// init方法初始化了5个定时器，主要用于分布式事务的重试机制，</span>
<span class="hljs-comment">// 因为分布式环境的不稳定性会造成事务处于中间状态，</span>
<span class="hljs-comment">// 所以要通过不断的重试机制来实现事务的最终一致性。</span>
<span class="hljs-comment">// 下面的定时器除了undoLogDelete之外，其他的定时任务默认都是1秒执行一次。</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">init</span><span class="hljs-params">()</span> </span>{
    <span class="hljs-comment">// 处理处于回滚状态可重试的事务</span>
	retryRollbacking.scheduleAtFixedRate(() -&gt; {
		<span class="hljs-keyword">try</span> {
			handleRetryRollbacking();
		} <span class="hljs-keyword">catch</span> (Exception e) {
			LOGGER.info(<span class="hljs-string">"Exception retry rollbacking ... "</span>, e);
		}
	}, <span class="hljs-number">0</span>, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
		
    <span class="hljs-comment">// 处理二阶段可以重试提交的状态可重试的事务</span>
	retryCommitting.scheduleAtFixedRate(() -&gt; {
		<span class="hljs-keyword">try</span> {
			handleRetryCommitting();
		} <span class="hljs-keyword">catch</span> (Exception e) {
			LOGGER.info(<span class="hljs-string">"Exception retry committing ... "</span>, e);
		}
	}, <span class="hljs-number">0</span>, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

    <span class="hljs-comment">// 处理异步提交的事务</span>
	asyncCommitting.scheduleAtFixedRate(() -&gt; {
		<span class="hljs-keyword">try</span> {
			handleAsyncCommitting();
		} <span class="hljs-keyword">catch</span> (Exception e) {
			LOGGER.info(<span class="hljs-string">"Exception async committing ... "</span>, e);
		}
	}, <span class="hljs-number">0</span>, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
    
	<span class="hljs-comment">// 检查事务的第一阶段已经超时的事务，设置事务状态为TimeoutRollbacking，</span>
	<span class="hljs-comment">// 该事务会由其他定时任务执行回滚操作</span>
	timeoutCheck.scheduleAtFixedRate(() -&gt; {
		<span class="hljs-keyword">try</span> {
			timeoutCheck();
		} <span class="hljs-keyword">catch</span> (Exception e) {
			LOGGER.info(<span class="hljs-string">"Exception timeout checking ... "</span>, e);
		}
	}, <span class="hljs-number">0</span>, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);
    
	<span class="hljs-comment">// 根据unlog的保存天数调用RM删除unlog</span>
	undoLogDelete.scheduleAtFixedRate(() -&gt; {
		<span class="hljs-keyword">try</span> {
			undoLogDelete();
		} <span class="hljs-keyword">catch</span> (Exception e) {
			LOGGER.info(<span class="hljs-string">"Exception undoLog deleting ... "</span>, e);
		}
	}, UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
}
</code></pre>
<h4>6. 初始化NettyRemotingServer</h4>
<p>NettyRemotingServer是基于Netty实现的简化版的Rpc服务端，NettyRemotingServer初始化时主要做了两件事：</p>
<ol>
<li><strong>registerProcessor</strong>：注册与Client通信的Processor。</li>
<li><strong>super.init()</strong>：super.init()方法中负责初始化Netty，并把当前实例的IP端口注册到注册中心中</li>
</ol>
<pre><code class="language-java"><span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">init</span><span class="hljs-params">()</span> </span>{
    <span class="hljs-comment">// registry processor</span>
    registerProcessor();
    <span class="hljs-keyword">if</span> (initialized.compareAndSet(<span class="hljs-keyword">false</span>, <span class="hljs-keyword">true</span>)) {
        <span class="hljs-keyword">super</span>.init();
    }
}

<span class="hljs-function"><span class="hljs-keyword">private</span> <span class="hljs-keyword">void</span> <span class="hljs-title">registerProcessor</span><span class="hljs-params">()</span> </span>{
	<span class="hljs-comment">// 1. 注册核心的ServerOnRequestProcessor，即与事务处理相关的Processor，</span>
	<span class="hljs-comment">// 如：全局事务开始、提交，分支事务注册、反馈当前状态等。</span>
	<span class="hljs-comment">// ServerOnRequestProcessor的构造函数中传入getHandler()返回的示例，这个handler</span>
	<span class="hljs-comment">// 就是前面提到的DefaultCoordinator，DefaultCoordinator是分布式事务的核心处理类</span>
	ServerOnRequestProcessor onRequestProcessor =
	    <span class="hljs-keyword">new</span> ServerOnRequestProcessor(<span class="hljs-keyword">this</span>, getHandler());
	<span class="hljs-keyword">super</span>.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
	<span class="hljs-keyword">super</span>.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
	<span class="hljs-keyword">super</span>.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
	<span class="hljs-keyword">super</span>.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
	<span class="hljs-keyword">super</span>.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
	<span class="hljs-keyword">super</span>.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
	<span class="hljs-keyword">super</span>.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
	<span class="hljs-keyword">super</span>.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
	<span class="hljs-keyword">super</span>.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
	
	<span class="hljs-comment">// 2.注册ResponseProcessor，ResponseProcessor用于处理当Server端主动发起请求时，</span>
	<span class="hljs-comment">// Client端回复的消息，即Response。如：Server向Client端发送分支事务提交或者回滚的请求时，</span>
	<span class="hljs-comment">// Client返回提交/回滚的结果</span>
	ServerOnResponseProcessor onResponseProcessor =
	    <span class="hljs-keyword">new</span> ServerOnResponseProcessor(getHandler(), getFutures());
	<span class="hljs-keyword">super</span>.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor);
	<span class="hljs-keyword">super</span>.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor);
	
	<span class="hljs-comment">// 3. Client端发起RM注册请求时对应的Processor</span>
	RegRmProcessor regRmProcessor = <span class="hljs-keyword">new</span> RegRmProcessor(<span class="hljs-keyword">this</span>);
	<span class="hljs-keyword">super</span>.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
	
	<span class="hljs-comment">// 4. Client端发起TM注册请求时对应的Processor</span>
	RegTmProcessor regTmProcessor = <span class="hljs-keyword">new</span> RegTmProcessor(<span class="hljs-keyword">this</span>);
	<span class="hljs-keyword">super</span>.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, <span class="hljs-keyword">null</span>);
	
	<span class="hljs-comment">// 5. Client端发送心跳请求时对应的Processor</span>
	ServerHeartbeatProcessor heartbeatMessageProcessor = <span class="hljs-keyword">new</span> ServerHeartbeatProcessor(<span class="hljs-keyword">this</span>);
	<span class="hljs-keyword">super</span>.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, <span class="hljs-keyword">null</span>);
}
</code></pre>
<p>在NettyRemotingServer中有调用基类AbstractNettyRemotingServer的init方法，代码如下：</p>
<pre><code class="language-java"><span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">init</span><span class="hljs-params">()</span> </span>{
	<span class="hljs-comment">// super.init()方法中启动了一个定时清理超时Rpc请求的定时任务，3S执行一次。</span>
    <span class="hljs-keyword">super</span>.init();
	<span class="hljs-comment">// 配置Netty Server端，开始监听端口。</span>
    serverBootstrap.start();
}

<span class="hljs-comment">// serverBootstrap.start();</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">start</span><span class="hljs-params">()</span> </span>{
	<span class="hljs-comment">// Netty server端的常规配置，其中添加了两个ChannelHandler：</span>
	<span class="hljs-comment">// ProtocolV1Decoder、ProtocolV1Encoder，</span>
	<span class="hljs-comment">// 分别对应Seata自定义RPC协议的解码器和编码器</span>
    <span class="hljs-keyword">this</span>.serverBootstrap.group(<span class="hljs-keyword">this</span>.eventLoopGroupBoss, <span class="hljs-keyword">this</span>.eventLoopGroupWorker)
        .channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ)
        .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize())
        .option(ChannelOption.SO_REUSEADDR, <span class="hljs-keyword">true</span>)
        .childOption(ChannelOption.SO_KEEPALIVE, <span class="hljs-keyword">true</span>)
        .childOption(ChannelOption.TCP_NODELAY, <span class="hljs-keyword">true</span>)
        .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize())
        .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize())
        .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
            <span class="hljs-keyword">new</span> WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(),
                nettyServerConfig.getWriteBufferHighWaterMark()))
        .localAddress(<span class="hljs-keyword">new</span> InetSocketAddress(listenPort))
        .childHandler(<span class="hljs-keyword">new</span> ChannelInitializer&lt;SocketChannel&gt;() {
            <span class="hljs-meta">@Override</span>
            <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">initChannel</span><span class="hljs-params">(SocketChannel ch)</span> </span>{
                ch.pipeline().addLast(<span class="hljs-keyword">new</span> IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), <span class="hljs-number">0</span>, <span class="hljs-number">0</span>))
                    .addLast(<span class="hljs-keyword">new</span> ProtocolV1Decoder())
                    .addLast(<span class="hljs-keyword">new</span> ProtocolV1Encoder());
                <span class="hljs-keyword">if</span> (channelHandlers != <span class="hljs-keyword">null</span>) {
                    addChannelPipelineLast(ch, channelHandlers);
                }

            }
        });

    <span class="hljs-keyword">try</span> {
		<span class="hljs-comment">// 开始监听配置的端口</span>
        ChannelFuture future = <span class="hljs-keyword">this</span>.serverBootstrap.bind(listenPort).sync();
        LOGGER.info(<span class="hljs-string">"Server started, listen port: {}"</span>, listenPort);
		<span class="hljs-comment">// Netty启动成功之后把当前实例注册到registry.conf配置文件配置的注册中心上</span>
        RegistryFactory.getInstance().register(<span class="hljs-keyword">new</span> InetSocketAddress(XID.getIpAddress(), XID.getPort()));
        initialized.set(<span class="hljs-keyword">true</span>);
        future.channel().closeFuture().sync();
    } <span class="hljs-keyword">catch</span> (Exception exx) {
        <span class="hljs-keyword">throw</span> <span class="hljs-keyword">new</span> RuntimeException(exx);
    }
}
</code></pre>
</section><footer class="footer-container"><div class="footer-body"><img src="//img.alicdn.com/tfs/TB1dGrSwVT7gK0jSZFpXXaTkpXa-4802-1285.png"/><p class="docsite-power">website powered by docsite</p><div class="cols-container"><div class="col col-12"><h3>愿景</h3><p>Seata 是一款阿里巴巴开源的分布式事务解决方案，致力于在微服务架构下提供高性能和简单易用的分布式事务服务。</p></div><div class="col col-6"><dl><dt>文档</dt><dd><a href="/zh-cn/docs/overview/what-is-seata.html" target="_self">Seata 是什么？</a></dd><dd><a href="/zh-cn/docs/user/quickstart.html" target="_self">快速开始</a></dd><dd><a href="https://github.com/seata/seata.github.io/issues/new" target="_self">报告文档问题</a></dd><dd><a href="https://github.com/seata/seata.github.io" target="_self">在Github上编辑此文档</a></dd></dl></div><div class="col col-6"><dl><dt>资源</dt><dd><a href="/zh-cn/blog/index.html" target="_self">博客</a></dd><dd><a href="/zh-cn/community/index.html" target="_self">社区</a></dd></dl></div></div><div class="copyright"><span>Copyright © 2019 Seata</span></div></div></footer></div></div>
	<script src="https://f.alicdn.com/react/15.4.1/react-with-addons.min.js"></script>
	<script src="https://f.alicdn.com/react/15.4.1/react-dom.min.js"></script>
	<script>
		window.rootPath = '';
  </script>
	<script src="/build/blogDetail.js"></script>
	<script>
    var _hmt = _hmt || [];
    (function() {
      var hm = document.createElement("script");
      hm.src = "https://hm.baidu.com/hm.js?104e73ef0c18b416b27abb23757ed8ee";
      var s = document.getElementsByTagName("script")[0];
      s.parentNode.insertBefore(hm, s);
    })();
    </script>
</body>
</html>
